%%--- coding:utf-8 ---
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%% File Name: mc_worker_api_logic
%%% Created on : 2024/4/21 11:51
%%% @author Gaylen 252323463@qq.com
%%% @copyright (C) 2024, freedom
%%% @doc
%%%
%%% @end
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

-module(mc_worker_api_logic).
-author("Gaylen").
-include("mongodb_driver.hrl").

%% API
-export([
    command/2,
    database_command/3,
    database_command/4,
    database_insert/4,
    database_update/7,
    async_database_update/10,
    database_delete/5,
    async_database_delete/8,
    database_find_one/5,
    database_ensure_index/6,
    async_database_ensure_index/9,
    database_ping/2,
    database_find/9,
    database_find_and_modify/7,
    database_kill_cursor/4,
    database_get_more/5,
    database_is_master/2,
    async_reply_callback/4
]).

command(Connection, OpQueryRec) ->
    OpReplyRec = priv_run_opquery(Connection, OpQueryRec),
    if
        OpReplyRec#mongo_opreply.res_flag_cursor_not_found =/= false ->
            {error, bad_cursor, OpReplyRec#mongo_opreply.cursor_id};
        OpReplyRec#mongo_opreply.res_flag_query_failure =/= false ->
            [ErrDoc | _] = OpReplyRec#mongo_opreply.documents,
            {error, query_failure, ErrDoc};
        true ->
            if
                OpReplyRec#mongo_opreply.number_returned =:= 1 ->
                    [Doc | _] = OpReplyRec#mongo_opreply.documents,
                    {ok, OpReplyRec#mongo_opreply.cursor_id, Doc};
                true ->
                    {ok, OpReplyRec#mongo_opreply.cursor_id, OpReplyRec#mongo_opreply.documents}
            end
    end.

-spec database_command(pid(), database(), selector()) -> {boolean(), map()}.
database_command(Connection, Database, Command) ->
    OpMsgRec = priv_run_opmsg(Connection, fun mongo_protocol:encode_command/3, [Database, Command]),
    IsOk = erlang:abs(maps:get(<<"ok">>, OpMsgRec#mongo_opmsg.document, 0) - 1.0),
    if
        IsOk < 0.0000001 ->
            {true, OpMsgRec#mongo_opmsg.document};
        true ->
            {false, OpMsgRec#mongo_opmsg.document}
    end.

-spec database_command(pid(), database(), selector(), boolean()) -> {boolean(), map()}.
database_command(Connection, Database, Command, IsSlaveOk) ->
    CommandWithSlaveFlag =
        if
            IsSlaveOk =:= true ->
                bson:update(<<"$readPreference">>, #{<<"mode">> => <<"secondaryPreferred">>}, Command);
            true ->
                Command
        end,
    database_command(Connection, Database, CommandWithSlaveFlag).

database_insert(Connection, Database, Table, Docs) ->
    OpMsgRec = priv_run_opmsg(Connection, fun mongo_protocol:encode_insert/4, [Database, Table, Docs]),
    ReplyDoc = OpMsgRec#mongo_opmsg.document,
    EffectNum = maps:get(<<"n">>, ReplyDoc, 0),
    {EffectNum > 0, ReplyDoc}.

database_update(Connection, Database, Table, Selector, Doc, IsUpInsert, IsMultiUpdate) ->
    UpdateDoc = #{
        <<"q">> => Selector,
        <<"u">> => Doc,
        <<"upsert">> => IsUpInsert,
        <<"multi">> => IsMultiUpdate
    },
    OpMsgRec = priv_run_opmsg(Connection, fun mongo_protocol:encode_update/4, [Database, Table, UpdateDoc]),
    priv_reply_update(OpMsgRec).

priv_reply_update(OpMsgRec) ->
    ReplyDoc = OpMsgRec#mongo_opmsg.document,
    EffectNum = maps:get(<<"n">>, ReplyDoc, 0),
    {EffectNum > 0, ReplyDoc}.

async_database_update(Connection, Database, Table, Selector, Doc, IsUpInsert, IsMultiUpdate, CallBackPid, CallBackFunc, CallBackArgs) ->
    UpdateDoc = #{
        <<"q">> => Selector,
        <<"u">> => Doc,
        <<"upsert">> => IsUpInsert,
        <<"multi">> => IsMultiUpdate
    },
    async_priv_run_opmsg(Connection, fun mongo_protocol:encode_update/4, [Database, Table, UpdateDoc], ?MONGO_OP_UPDATE, CallBackPid, CallBackFunc, CallBackArgs).

database_delete(Connection, Database, Table, Selector, Limit) ->
    DeleteDoc = #{
        <<"q">> => Selector,
        <<"limit">> => Limit
    },
    OpMsgRec = priv_run_opmsg(Connection, fun mongo_protocol:encode_delete/4, [Database, Table, DeleteDoc]),
    priv_reply_delete(OpMsgRec).

priv_reply_delete(OpMsgRec) ->
    ReplyDoc = OpMsgRec#mongo_opmsg.document,
    EffectNum = maps:get(<<"n">>, ReplyDoc, 0),
    {EffectNum > 0, ReplyDoc}.

async_database_delete(Connection, Database, Table, Selector, Limit, CallBackPid, CallBackFunc, CallBackArgs) ->
    DeleteDoc = #{
        <<"q">> => Selector,
        <<"limit">> => Limit
    },
    async_priv_run_opmsg(Connection, fun mongo_protocol:encode_delete/4, [Database, Table, DeleteDoc], ?MONGO_OP_DELETE, CallBackPid, CallBackFunc, CallBackArgs).

database_find_one(Connection, Database, Table, Selector, Projector) ->
    OpMsgRec = priv_run_opmsg(Connection, fun mongo_protocol:encode_find_one/5, [Database, Table, Selector, Projector]),
    IsOk = erlang:abs(maps:get(<<"ok">>, OpMsgRec#mongo_opmsg.document, 0) - 1.0),
    if
        IsOk < 0.0000001 ->
            CursorDoc = maps:get(<<"cursor">>, OpMsgRec#mongo_opmsg.document, #{}),
            case maps:get(<<"firstBatch">>, CursorDoc, [undefined]) of
                [Doc | _] -> {true, Doc};
                _ -> {false, undefined}
            end;
        true ->
            {false, error}
    end.

database_ensure_index(Connection, Database, Table, IndexSpecs, Unique, DropDups) ->
    IndexName = priv_gen_name_by_index_specs(IndexSpecs),
    OpMsgRec = priv_run_opmsg(Connection, fun mongo_protocol:encode_create_indexes/7, [Database, Table, IndexName, IndexSpecs, Unique, DropDups]),
    priv_reply_ensure_index(OpMsgRec).

priv_reply_ensure_index(OpMsgRec) ->
    ResultDoc = OpMsgRec#mongo_opmsg.document,
    IsOk = erlang:abs(maps:get(<<"ok">>, ResultDoc, 0) - 1.0),
    if
        IsOk < 0.0000001 ->
            NumIndexesAfter = maps:get(<<"numIndexesAfter">>, ResultDoc, 0),
            NumIndexesBefore = maps:get(<<"numIndexesBefore">>, ResultDoc, 0),
            if
                NumIndexesAfter > NumIndexesBefore ->
                    {true, #{<<"msg">> => <<"ok">>}};
                true ->
                    {false, maps:get(<<"note">>, ResultDoc, <<"undefined">>)}
            end;
        true ->
            {false, maps:get(<<"note">>, ResultDoc, <<"undefined">>)}
    end.

async_database_ensure_index(Connection, Database, Table, IndexSpecs, Unique, DropDups, CallBackPid, CallBackFunc, CallBackArgs) ->
    IndexName = priv_gen_name_by_index_specs(IndexSpecs),
    async_priv_run_opmsg(Connection, fun mongo_protocol:encode_create_indexes/7, [Database, Table, IndexName, IndexSpecs, Unique, DropDups], ?MONGO_OP_ENSURE_INDEX, CallBackPid, CallBackFunc, CallBackArgs).

database_ping(Connection, Database) ->
    OpMsgRec = priv_run_opmsg(Connection, fun mongo_protocol:encode_ping/2, [Database]),
    ResultDoc = OpMsgRec#mongo_opmsg.document,
    IsOk = erlang:abs(maps:get(<<"ok">>, ResultDoc, 0) - 1.0),
    if
        IsOk < 0.0000001 -> ok;
        true -> {error, ResultDoc}
    end.

database_find(Connection, Database, Table, Selector, SortDoc, Projector, Skip, Limit, IsSlave) ->
    OpMsgRec = priv_run_opmsg(Connection, fun mongo_protocol:encode_find_list/9, [Database, Table, Selector, SortDoc, Projector, Skip, Limit, IsSlave]),
    ResultDoc = OpMsgRec#mongo_opmsg.document,
    IsOk = erlang:abs(maps:get(<<"ok">>, ResultDoc, 0) - 1.0),
    if
        IsOk < 0.0000001 ->
            CursorBson = maps:get(<<"cursor">>, ResultDoc, #{}),
            FirstBatch = maps:get(<<"firstBatch">>, CursorBson, []),
            CursorId = maps:get(<<"id">>, CursorBson, 0),
            CursorPid =
                if
                    CursorId > 0 ->
                        case priv_start_cursor(Connection, Database, Table, CursorId) of
                            {ok, Pid} -> Pid;
                            _ -> undefined
                        end;
                    true ->
                        undefined
                end,
            {true, FirstBatch, CursorPid};
        true ->
            {false, ResultDoc}
    end.

database_find_and_modify(Connection, Database, Table, Selector, UpdateDoc, Projector, IsUpInsert) ->
    OpMsgRec = priv_run_opmsg(Connection, fun mongo_protocol:encode_find_and_modify/7, [Database, Table, Selector, UpdateDoc, Projector, IsUpInsert]),
    ResultDoc = OpMsgRec#mongo_opmsg.document,
    IsOk = erlang:abs(maps:get(<<"ok">>, ResultDoc, 0) - 1.0),
    if
        IsOk < 0.0000001 ->
            case maps:get(<<"value">>, ResultDoc, undefined) of
                undefined -> {false, undefined};
                Val -> {true, Val}
            end;
        true -> {false, ResultDoc}
    end.

database_kill_cursor(Connection, Database, Table, CursorId) ->
    OpMsgRec = async_priv_run_opmsg(Connection, fun mongo_protocol:encode_kill_cursors/4, [Database, Table, CursorId], ?MONGO_OP_KILL_CURSORS, undefined, undefined, undefined),
    ResultDoc = OpMsgRec#mongo_opmsg.document,
    IsOk = erlang:abs(maps:get(<<"ok">>, ResultDoc, 0) - 1.0),
    if
        IsOk < 0.0000001 -> ok;
        true -> {error, ResultDoc}
    end.

database_get_more(Connection, Database, Table, CursorId, BatchSize) ->
    OpMsgRec = priv_run_opmsg(Connection, fun mongo_protocol:encode_get_more/5, [Database, Table, CursorId, BatchSize]),
    ResultDoc = OpMsgRec#mongo_opmsg.document,
    IsOk = erlang:abs(maps:get(<<"ok">>, ResultDoc, 0) - 1.0),
    if
        IsOk < 0.0000001 ->
            CursorDoc = maps:get(<<"cursor">>, ResultDoc, #{}),
            {ok, maps:get(<<"nextBatch">>, CursorDoc, [])};
        true ->
            {error, ResultDoc}
    end.

database_is_master(Connection, Database) ->
    OpMsgRec = priv_run_opmsg(Connection, fun mongo_protocol:encode_is_master/2, [Database]),
    IsOk = erlang:abs(maps:get(<<"ok">>, OpMsgRec#mongo_opmsg.document, 0) - 1.0),
    if
        IsOk < 0.0000001 ->
            maps:get(<<"ismaster">>, OpMsgRec#mongo_opmsg.document, false);
        true ->
            false
    end.

async_reply_callback(MongoOpType, OpReplyBin, CallBackFunc, CallBackArgs) ->
    {OpMsgRec, _Bin} = mongo_protocol:decode_reply(OpReplyBin),
    Reply =
        case MongoOpType of
            ?MONGO_OP_UPDATE ->
                %% 返回 {bool, Doc}
                priv_reply_update(OpMsgRec);
            ?MONGO_OP_DELETE ->
                %% 返回 {bool, Doc}
                priv_reply_delete(OpMsgRec);
            ?MONGO_OP_ENSURE_INDEX ->
                %% 返回 {bool, Doc}
                priv_reply_ensure_index(OpMsgRec);
            _ ->
                OpMsgRec
        end,
    erlang:apply(CallBackFunc, [Reply | CallBackArgs]).

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

%% @private
priv_run_opmsg(Connection, Func, Args) ->
    %% TODO 捕捉异常
    RequestId = mc_utils:request_id(),
    OpMsgBin = erlang:apply(Func, [RequestId | Args]),
    OpReplyBin = gen_server:call(Connection, {opmsg, RequestId, OpMsgBin}, mc_utils:get_timeout()),
    {OpMsgRec, _Bin} = mongo_protocol:decode_reply(OpReplyBin),
    OpMsgRec.

async_priv_run_opmsg(Connection, Func, Args, MongoOpType, CallBackPid, CallBackFunc, CallBackArgs) ->
    %% TODO 捕捉异常
    RequestId = mc_utils:request_id(),
    OpMsgBin = erlang:apply(Func, [RequestId | Args]),
    if
        CallBackFunc =/= undefined -> gen_server:cast(Connection, {opmsg, RequestId, OpMsgBin, MongoOpType, CallBackPid, fun mc_worker_api_logic:async_reply_callback/4, [CallBackFunc, CallBackArgs]});
        true -> gen_server:cast(Connection, {opmsg, RequestId, OpMsgBin, MongoOpType, undefined, undefined, undefined})
    end,
    {true, #{<<"ok">> => 1.0}}.

%% @private
priv_run_opquery(Connection, OpQueryRec) ->
    RequestId = mc_utils:request_id(),
    OpMsgBin = mongo_protocol:encode_opquery(OpQueryRec#mongo_opquery{request_id = RequestId}),
    OpReplyBin = gen_server:call(Connection, {query, RequestId, OpMsgBin}, mc_utils:get_timeout()),
    {OpReply, _Bin} = mongo_protocol:decode_reply(OpReplyBin),
    OpReply.

priv_gen_name_by_index_specs(IndexSpecs) ->
    priv_gen_name_by_index_specs(IndexSpecs, 0, tuple_size(IndexSpecs) div 2, <<"">>).
priv_gen_name_by_index_specs(_IndexSpecs, Max, Max, NameAcc) ->
    NameAcc;
priv_gen_name_by_index_specs(IndexSpecs, Cur, Max, NameAcc) ->
    KeyName =
        case element(Cur*2+1, IndexSpecs) of
            K1 when is_atom(K1) -> atom_to_binary(K1);
            K2 -> K2
        end,
    SortVal = integer_to_binary(element(Cur*2+2, IndexSpecs)),
    NewNameAcc = <<NameAcc/binary, KeyName/binary, <<"_">>/binary, SortVal/binary>>,
    priv_gen_name_by_index_specs(IndexSpecs, Cur + 1, Max, NewNameAcc).

priv_start_cursor(Connection, Database, Table, CursorId) ->
    mc_cursor:start_link(Connection, Database, Table, CursorId, 100).

